package Source;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;

import processFunction.UserBehavior;
import processFunction.itemPvViewCount;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Properties;
/*
实时热门商品统计
基本需求
统计近1个小时内的热门商品，每五分钟更新一次
热门度使用浏览（“pv”）来衡量
解决思路
在所有用户行为数据中，过滤出浏览(”pv”)行为进行统计
构建滑动窗口，窗口长度为1小时，滑动距离为5分钟，统计出每一种商品的访问数
再根据滑动窗口的时间，统计出访问次数最多的5个商品
 */

public class json {
    public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置全局并行度
        env.setParallelism(1);
        //2.连接kafka，使用kafka消费者读取数据
        /*
        第二个参数是一个DeserializationSchema 或者 KeyedDeserializationSchema。
        Kafka 消息被存储为原始的字节数据，所以需要反序列化成 Java 或者 Scala 对象。
        上面代码中使用的 SimpleStringSchema，是一个内置的 DeserializationSchema，
        它只是将字节数组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是公共接口，
        所以我们也可以自定义反序列化逻辑。
         */
        Properties properties = new Properties();
        //配置连接kafka集群信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");

        //key value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //创建与kafka消费者的连接
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("userBehavior", new SimpleStringSchema(), properties);
        DataStreamSource<String> kafkaSource = env.addSource(flinkKafkaConsumer);

        SingleOutputStreamOperator<UserBehavior> mapStream = kafkaSource.map(new MapFunction<String, UserBehavior>() {
            //{"user_id":124393,"item_id":2707570,"category_id":3898483,"behavior":"pv","timestamp":1511534024000}
            //->UserBehavior{uerId=1000490, itemId=4029678, categoryId=2465336, behavior='pv', timestamp=1511539191000}
            @Override
            public UserBehavior map(String value) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                Long uerId = jsonObject.getLong("user_id");
                Long itemId = jsonObject.getLong("item_id");
                Integer categoryId = jsonObject.getInteger("category_id");
                String behavior = jsonObject.getString("behavior");
                Long timestamp = jsonObject.getLong("timestamp");
                return new UserBehavior(uerId, itemId, categoryId, behavior, timestamp);
            }
        });
        //TODO 提取时间戳并生成水位线
        SingleOutputStreamOperator<UserBehavior> watermarkStream = mapStream.assignTimestampsAndWatermarks(WatermarkStrategy
                .<UserBehavior>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
                    @Override
                    public long extractTimestamp(UserBehavior element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));

        // TODO 筛选出pv的数据，按照商品id分组，划分滑动时间窗口，对每个窗口进行增量聚合，
        // 并将输出结果进行设定指定格式ItemViewCount
        SingleOutputStreamOperator<itemPvViewCount> aggregateStream = watermarkStream.filter(data -> "pv".equals(data.getBehavior()))
                .keyBy(data -> data.getItemId())
                .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(5)))
                .aggregate(new itemPVSum(), new itemPvViewResult());

        //TODO 对结果中同一个窗口的统计数据，进行排序处理
        SingleOutputStreamOperator<String> result = aggregateStream.keyBy(data -> data.windowEnd)
                .process(new topN(5));

        result.print();


        env.execute();

    }
    //TODO 自定义增量聚合, 设定同一个商品数据的聚合方法
    public static class itemPVSum implements AggregateFunction<UserBehavior,Long,Long>{

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(UserBehavior value, Long accumulator) {
            return accumulator+1L;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return a+b;
        }
    }

    // TODO 设定输出格式,自定义全窗口函数，只需要包装窗口信息
        public static class itemPvViewResult extends ProcessWindowFunction<Long,itemPvViewCount,Long,TimeWindow>{

        @Override
        public void process(Long aLong, ProcessWindowFunction<Long, itemPvViewCount, Long, TimeWindow>.Context context, Iterable<Long> elements, Collector<itemPvViewCount> out) throws Exception {
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            out.collect(new itemPvViewCount(aLong,elements.iterator().next(),start,end));
        }
    }
    //TODO 自定义处理函数，排序
    public static class topN extends KeyedProcessFunction<Long,itemPvViewCount,String>{

        //设置top几
        private int n;

        //定义一个列表状态
        public ListState<itemPvViewCount> itemViewCountListState;

        public topN(int n) {
            this.n = n;
        }

        //获取状态句柄
        @Override
        public void open(Configuration parameters) throws Exception {
            itemViewCountListState =getRuntimeContext().getListState(new ListStateDescriptor<itemPvViewCount>
                    ("item", Types.POJO(itemPvViewCount.class)));
        }

        @Override
        public void processElement(itemPvViewCount value, KeyedProcessFunction<Long, itemPvViewCount, String>.Context ctx, Collector<String> out) throws Exception {
            //将count数据加到状态列表中,保存起来
            itemViewCountListState.add(value);
            //注册window end+1ms后的注册器,等待所有数据到齐开始排序
            ctx.timerService().registerProcessingTimeTimer(ctx.getCurrentKey() + 1);
        }

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Long, itemPvViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            // 将数据从列表状态变量中取出，放入 ArrayList，方便排序
            ArrayList<itemPvViewCount> list = new ArrayList<>();
            for(itemPvViewCount pvcount:itemViewCountListState.get()){
                list.add(pvcount);
            }
            list.sort(new Comparator<itemPvViewCount>() {
                @Override
                public int compare(itemPvViewCount o1, itemPvViewCount o2) {
                    return (int) (o2.count- o1.count);
                }
            });

            itemViewCountListState.clear();
            //取出前5名输出结果
            StringBuilder result = new StringBuilder();
            result.append("==============================\n");
            result.append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n");
            for (int i = 0; i < Math.min(this.n, list.size()); i++) {
                itemPvViewCount itemPvViewCount = list.get(i);
                String info = "No." + (i + 1) + " "
                        + "商品id" + itemPvViewCount.item+ " "
                        + "浏览量：" + itemPvViewCount.count + "\n";
                result.append(info);
            }
            result.append("==============================\n");
            out.collect(result.toString());
        }


    }

}
